{
  "cells": [
    {
      "cell_type": "code",
      "execution_count": 7,
      "metadata": {
        "id": "kQSTgdzDO0yX"
      },
      "outputs": [],
      "source": [
        "import math\n",
        "import numpy as np\n",
        "import pandas as pd\n",
        "import matplotlib.pyplot as plt\n",
        "import scipy\n",
        "from scipy import signal\n",
        "import os\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ZgKBEzPs7Na1"
      },
      "source": [
        "# Functions"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "pKdV9-AIu6nT"
      },
      "source": [
        "## Read ppg signal csv"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 8,
      "metadata": {
        "id": "uhgW58L9u9Qn"
      },
      "outputs": [],
      "source": [
        "def funcReadSignal(path_ppg):\n",
        "  df_ppg = pd.read_csv(path_ppg)\n",
        "  signal_ppg = df_ppg['PPG'] # PPG value\n",
        "  return signal_ppg"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ZL1w5O-Y7Ra3"
      },
      "source": [
        "## Graph PPG Signal"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 27,
      "metadata": {
        "id": "NN0A26W-7Q_O"
      },
      "outputs": [],
      "source": [
        "def funcVisualizeSignal(ppg_signal, fs, label, scatterX = 0, scatterY = 0, scatter = False): # scatterX in samples --> generates times array = samples/fs\n",
        "  L = len(ppg_signal)\n",
        "  t = np.linspace(0,L/fs,L)\n",
        "  plt.figure(figsize=(50,10))\n",
        "  plt.plot(t,ppg_signal, 'y')\n",
        "  if (scatter == True):\n",
        "    plt.scatter(scatterX/fs, scatterY)\n",
        "  plt.ylabel('Amplitude', fontsize=14)\n",
        "  plt.xlabel('Time [s]', fontsize=14)\n",
        "  plt.title(label, fontsize=14)\n",
        "  plt.show()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "FHDtdWcE7egV"
      },
      "source": [
        "## Filters"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "qTZ5WrUz7roL"
      },
      "source": [
        "### BP Filter"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 10,
      "metadata": {
        "id": "sSHSepz37kUM"
      },
      "outputs": [],
      "source": [
        "def funcBPFilter(ppg_signal, order, f1, f2, fs):\n",
        "  fc = [f1,f2]\n",
        "  [b,a] = signal.butter(order, fc, btype = 'bandpass', analog = False, output = 'ba', fs = fs) # Butterworth Nth-order digital IIR filter design: returns the filter coefficients (Numerator (b) and denominator (a)) of the polynomials of the IIR filter\n",
        "  signalBP = signal.filtfilt(b, a, ppg_signal) # Applies a linear digital filter forward and backward to the signal: the combined filter has zero phase and a filter order twice that of the original\n",
        "  return signalBP"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "uT96dxoK83nE"
      },
      "source": [
        "### LP Filter"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 11,
      "metadata": {
        "id": "becSl6OT86Ih"
      },
      "outputs": [],
      "source": [
        "def funcLPFilter(ppg_signal, order, fc, fs):\n",
        "  [b,a] = signal.butter(order, fc, btype = 'lowpass', analog = False, output = 'ba', fs = fs)\n",
        "  signalLP = signal.filtfilt(b, a, ppg_signal)\n",
        "  return signalLP"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "-7r2NW4C8696"
      },
      "source": [
        "### HP Filter"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 12,
      "metadata": {
        "id": "1k9Wpbh38819"
      },
      "outputs": [],
      "source": [
        "def funcHPFilter(ppg_signal, order, fc, fs):\n",
        "  [b,a] = signal.butter(order, fc, btype = 'highpass', analog = False, output = 'ba', fs = fs)\n",
        "  signalHP = signal.filtfilt(b, a, ppg_signal)\n",
        "  return signalHP"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "7kkRENKNOVAf"
      },
      "source": [
        "### Adaptative Filter"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 29,
      "metadata": {
        "id": "G9lgWvYtOYFG"
      },
      "outputs": [],
      "source": [
        "def funcFilter(ppg_signal, fs, HR, order=3):\n",
        "  if HR < 60:\n",
        "    fmin = 0.1\n",
        "    fmax = 1.5\n",
        "  elif HR < 100:\n",
        "    fmin = 0.35\n",
        "    fmax = 2.5\n",
        "  elif HR < 180:\n",
        "    fmin = 0.5\n",
        "    fmax = 3.5\n",
        "  else:\n",
        "    fmin = 1\n",
        "    fmax = 4.5\n",
        "  ppg_signal_HP = funcHPFilter(ppg_signal = ppg_signal, order = order, fc = fmin, fs = fs)\n",
        "  ppg_signal_LP = funcLPFilter(ppg_signal = ppg_signal_HP, order = order, fc = fmax, fs = fs)\n",
        "  return ppg_signal_LP"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "HrSxtABTNPJU"
      },
      "source": [
        "## Estimate HR"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 46,
      "metadata": {
        "id": "zOJmm9EENWaj"
      },
      "outputs": [],
      "source": [
        "def funcFindPosition(array, value):\n",
        "  for i, element in enumerate(array):\n",
        "    if element >= value:\n",
        "      return i\n",
        "\n",
        "def funcEstimateHR(ppg_signal, f1, f2, fs):\n",
        "  ppg_signal_BP = funcBPFilter(ppg_signal=ppg_signal, order=3, f1=f1, f2=f2, fs=fs)\n",
        "  # FFT\n",
        "  fft_result = np.fft.fft(ppg_signal_BP)\n",
        "  fft_freqs = np.fft.fftfreq(len(ppg_signal_BP), 1/fs)\n",
        "  # Find dominant frequency in the FFT that estimates the heart frequency --> 30bpm to 240 bpm = 0.5 to 4 Hz\n",
        "  x_min = funcFindPosition(fft_freqs, 0.5)\n",
        "  x_max = funcFindPosition(fft_freqs, 4)\n",
        "  positive_freqs = fft_freqs[:len(fft_freqs)//2][x_min:x_max]\n",
        "  positive_result = fft_result[:len(fft_result)//2][x_min:x_max]\n",
        "  magnitude_spectrum = np.abs(positive_result)\n",
        "  max_peak_index = np.argmax(magnitude_spectrum)\n",
        "  dominant_freq = positive_freqs[max_peak_index]\n",
        "  heart_rate = dominant_freq * 60  # Transform Hz to bpm\n",
        "\n",
        "  # Plot FFT and dominant frequency\n",
        "  plt.figure(figsize=(10,3))\n",
        "  plt.plot(positive_freqs, magnitude_spectrum)\n",
        "  plt.plot(positive_freqs[max_peak_index], magnitude_spectrum[max_peak_index], 'ro')\n",
        "  plt.title(f\"FFT - HR: {heart_rate:.2f} bpm\")\n",
        "  plt.xlabel(\"Frequency (Hz)\")\n",
        "  plt.grid()\n",
        "  plt.xlim(0.5, 5)\n",
        "  plt.tight_layout()\n",
        "  plt.show()\n",
        "\n",
        "  print(f\"Estimated HR: {heart_rate:.2f} bpm\")\n",
        "  return heart_rate\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "bYUpm7R7_E7H"
      },
      "source": [
        "## Find Pulse Peaks"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 15,
      "metadata": {
        "id": "GGijTEk8_GXj"
      },
      "outputs": [],
      "source": [
        "def funcFindPeaks(ppg_signal):\n",
        "  peaks = signal.find_peaks(ppg_signal)[0]\n",
        "  return peaks"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "JqT437e3DCke"
      },
      "source": [
        "### Remove artifact and diastolic peaks:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 38,
      "metadata": {
        "id": "ejaz-pAzRPNF"
      },
      "outputs": [],
      "source": [
        "def funRemoveUpStatePeaks(ppg_signal, locs_peaks_ppg, HR, fs):\n",
        "  thd = int((60/HR)*fs/4)\n",
        "  correct_locs_peaks_ppg = []\n",
        "  correct_locs_peaks_ppg.append(locs_peaks_ppg[0])\n",
        "  for loc in range(1, len(locs_peaks_ppg)):\n",
        "    if (locs_peaks_ppg[loc]+thd) < len(ppg_signal):\n",
        "        if ppg_signal[locs_peaks_ppg[loc]+thd] < ppg_signal[locs_peaks_ppg[loc]]:\n",
        "            correct_locs_peaks_ppg.append(locs_peaks_ppg[loc])\n",
        "    else:\n",
        "        correct_locs_peaks_ppg.append(locs_peaks_ppg[loc])\n",
        "  return np.array(correct_locs_peaks_ppg)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 39,
      "metadata": {
        "id": "JUyDxB1SQXpQ"
      },
      "outputs": [],
      "source": [
        "def funcRemoveDiastolicPeaks(ppg_signal, locs_peaks_ppg, HR, fs): # \"funRemoveDownStatePeaks\"\n",
        "  thd = int((60/HR)*fs/4)\n",
        "  locs_systolic_peaks_ppg = []\n",
        "  locs_systolic_peaks_ppg.append(locs_peaks_ppg[0])\n",
        "  for loc in range(1, len(locs_peaks_ppg)):\n",
        "    if ppg_signal[locs_peaks_ppg[loc]-thd] < ppg_signal[locs_peaks_ppg[loc]]:\n",
        "      locs_systolic_peaks_ppg.append(locs_peaks_ppg[loc])\n",
        "  return np.array(locs_systolic_peaks_ppg)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 34,
      "metadata": {
        "id": "nd25ogLYUaSy"
      },
      "outputs": [],
      "source": [
        "def funcFindSystolicPeaks(ppg_signal, fs, HR):\n",
        "  locs_peaks_ppg = funcFindPeaks(ppg_signal)\n",
        "  locs_peaks_ppg = funRemoveUpStatePeaks(ppg_signal, locs_peaks_ppg, HR, fs)\n",
        "  locs_systolic_peaks_ppg = funcRemoveDiastolicPeaks(ppg_signal, locs_peaks_ppg, HR, fs)\n",
        "  return locs_systolic_peaks_ppg"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "oALroZzWV0QK"
      },
      "source": [
        "## Find Pulse Onsets"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 32,
      "metadata": {
        "id": "gh0k9XIpV8Z4"
      },
      "outputs": [],
      "source": [
        "def funcDetectPeakOnsets(ppg_signal, locs_peaks_ppg):\n",
        "  locs_onsets_ppg = []\n",
        "  for i in range(len(locs_peaks_ppg)-1):\n",
        "    locs_onsets_ppg.append(np.argmin(ppg_signal[locs_peaks_ppg[i]:locs_peaks_ppg[i+1]])+locs_peaks_ppg[i])\n",
        "  return np.array(locs_onsets_ppg)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "uCxH5DQa8TCE"
      },
      "source": [
        "## Measure HBI (Heart Beat Interval)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 20,
      "metadata": {
        "id": "a5CMzPH78XK8"
      },
      "outputs": [],
      "source": [
        "def funcHBI(locs_peaks_ppg):\n",
        "  HBI = [j-i for i,j in zip(locs_peaks_ppg[0:-1], locs_peaks_ppg[1:])]\n",
        "  return HBI"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "isc2sqmr8YIp"
      },
      "source": [
        "## Measure PPGA (PPG Amplitude)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 44,
      "metadata": {
        "id": "Z8D_Z40x8V7A"
      },
      "outputs": [],
      "source": [
        "def funcPPGA(ppg_signal, locs_peaks_ppg, locs_onsets_ppg):\n",
        "  if locs_onsets_ppg[0]>locs_peaks_ppg[0]:\n",
        "    locs_peaks_ppg = locs_peaks_ppg[1:]\n",
        "  PPGA = [int(ppg_signal[peak]-ppg_signal[onset]) for peak,onset in zip(locs_peaks_ppg, locs_onsets_ppg)]\n",
        "  return PPGA"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "49ANi1fxdAkR"
      },
      "source": [
        "## Graph Time Series"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 36,
      "metadata": {
        "id": "r_KToFjddHr2"
      },
      "outputs": [],
      "source": [
        "def funcPlotTimeSeries(array, label, title):\n",
        "  plt.figure(figsize=(10,3))\n",
        "  plt.plot(array)\n",
        "  plt.title(title)\n",
        "  plt.xlabel(\"Pulse Number\")\n",
        "  plt.ylabel(label)\n",
        "  plt.grid()\n",
        "  plt.tight_layout()\n",
        "  plt.show()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "3q4i-QWmjanO"
      },
      "source": [
        "# Signals"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 4,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "jDjZ7Oour01s",
        "outputId": "0aacfef8-658f-4202-b952-35af1d448f8c"
      },
      "outputs": [
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount(\"/content/drive\", force_remount=True).\n"
          ]
        }
      ],
      "source": [
        "from google.colab import drive\n",
        "drive.mount('/content/drive')"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 5,
      "metadata": {
        "id": "F8dLYZQ5aC_C"
      },
      "outputs": [],
      "source": [
        "dataset_path = '/content/drive/MyDrive/PPG_Signal_Dataset/dataset'"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/",
          "height": 1000
        },
        "id": "rw5ZUn8vsbC9",
        "outputId": "309d8bb2-ff90-4f3f-d904-c7a61d4f090e"
      },
      "outputs": [],
      "source": [
        "fs = 500\n",
        "\n",
        "for filename in os.listdir(dataset_path):\n",
        "  subjectcode = filename.split('.')[0]\n",
        "  print(subjectcode)\n",
        "\n",
        "  # Load the signal form the csv\n",
        "  path_ppg = os.path.join(dataset_path, filename)\n",
        "  ppg_signal = funcReadSignal(path_ppg)\n",
        "\n",
        "  # Plot the signal\n",
        "  funcVisualizeSignal(ppg_signal = ppg_signal, fs = 500, label = subjectcode, scatterX = 0, scatterY = 0, scatter = False)\n",
        "\n",
        "  # Estimate the heart rate based on the signal's spectrum\n",
        "  HR = funcEstimateHR(ppg_signal=ppg_signal, f1=0.1, f2=5, fs=fs) # 30bpm to 240 bpm = 0.5 to 4 Hz --> filter from 0.1 to 5 Hz\n",
        "\n",
        "  # HP Filter to remove the drift and LP Filter to remove high frequency noise (IIR Butterworth order 3 filters with cut off frequencies adaptative to the subject's heart rate)\n",
        "  ppg_signal_filtered = funcFilter(ppg_signal=ppg_signal, fs=fs, HR=HR, order=3)\n",
        "  funcVisualizeSignal(ppg_signal=ppg_signal_filtered, fs=fs, label=f\"Filtered {subjectcode}\")\n",
        "\n",
        "  # Find systolic peaks\n",
        "  locs_systolic_peaks_ppg = funcFindSystolicPeaks(ppg_signal=ppg_signal_filtered, fs=fs, HR=HR)\n",
        "  funcVisualizeSignal(ppg_signal=ppg_signal_filtered, fs=fs, label=f\"Systolic Peaks {subjectcode}\", scatterX = locs_systolic_peaks_ppg, scatterY = ppg_signal_filtered[locs_systolic_peaks_ppg], scatter = True)\n",
        "\n",
        "  # Find systolic peaks onsets\n",
        "  locs_onsets_ppg = funcDetectPeakOnsets(ppg_signal=ppg_signal_filtered, locs_peaks_ppg=locs_systolic_peaks_ppg)\n",
        "  funcVisualizeSignal(ppg_signal=ppg_signal_filtered, fs=fs, label=f\"Systolic Peaks Onsets {subjectcode}\", scatterX = locs_onsets_ppg, scatterY = ppg_signal_filtered[locs_onsets_ppg], scatter = True)\n",
        "\n",
        "  # Obtain pulse amplitudes and heart beat intervals\n",
        "  PPGA = funcPPGA(ppg_signal=ppg_signal_filtered, locs_peaks_ppg=locs_systolic_peaks_ppg, locs_onsets_ppg=locs_onsets_ppg)\n",
        "  HBI = funcHBI(locs_peaks_ppg=locs_systolic_peaks_ppg)\n",
        "\n",
        "  # Plot time series\n",
        "  funcPlotTimeSeries(PPGA, label=\"PPGA\", title=f\"PPGA Time Series {subjectcode}\")\n",
        "  funcPlotTimeSeries(HBI, label=\"HBI\", title=f\"HBI Time Series {subjectcode}\")\n",
        "\n",
        "  print()"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "collapsed_sections": [
        "pKdV9-AIu6nT",
        "ZL1w5O-Y7Ra3",
        "qTZ5WrUz7roL",
        "uT96dxoK83nE",
        "-7r2NW4C8696",
        "7kkRENKNOVAf",
        "HrSxtABTNPJU",
        "bYUpm7R7_E7H",
        "JqT437e3DCke",
        "oALroZzWV0QK",
        "uCxH5DQa8TCE",
        "isc2sqmr8YIp",
        "49ANi1fxdAkR"
      ],
      "provenance": []
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
